package com.xiaofan.apitest.window

import com.xiaofan.apitest.source.SensorReading
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

object ProcessFunctionTest {
  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val inputStream: DataStream[String] = env.socketTextStream("192.168.1.27", 9999)

    val dataStream: DataStream[SensorReading] = inputStream.map(
      data => {
        val arr: Array[String] = data.split(",")
        SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
      }
    )

    // 需求：如果传感器 10秒内连续上升，则需要报警
    val warningStream: DataStream[String] = dataStream
      .keyBy(_.id)
      .process(new TempIncrementWarning(10000L))

    warningStream.print()

    env.execute("process function test")

  }
}
/**
 * 自定义的keyedProcessFunction
 */
class TempIncrementWarning(interval: Long) extends KeyedProcessFunction[String, SensorReading, String] {

  // 定义状态，保存上一个温度值进行比较，保存注册定时器的时间戳，用于删除
  lazy val lastTempState: ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("last-temp", classOf[Double]))
  lazy val timerTsState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("timer-ts", classOf[Long]))
  lazy val noRepeat: ValueState[Boolean] = getRuntimeContext.getState(new ValueStateDescriptor[Boolean]("open-status", classOf[Boolean]))


  override def processElement(value: SensorReading, ctx: KeyedProcessFunction[String, SensorReading, String]#Context, out: Collector[String]): Unit = {
    // 获取状态
    val lastTemp: Double = lastTempState.value()
    val timerTs: Long = timerTsState.value()

    // 当前温度和上次温度进行比较
    if (value.temperature > lastTemp) {
      if (noRepeat.value()) {
        // 如果温度上升，且没有定时器，那么注册当前时间10s之后的定时器
        val ts: Long = ctx.timerService().currentProcessingTime() + interval
        ctx.timerService().registerProcessingTimeTimer(ts)
        timerTsState.update(ts)
        noRepeat.update(false)
      }
    } else {
      // 如果温度下降，那么删除定时器
      ctx.timerService().deleteProcessingTimeTimer(timerTs)
      timerTsState.clear()
      noRepeat.update(true)
    }

    // 更新温度值
    lastTempState.update(value.temperature)
  }

  override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, SensorReading, String]#OnTimerContext, out: Collector[String]): Unit = {
    out.collect("传感器 " + ctx.getCurrentKey + "的温度连续" + interval / 1000 + "秒连续上升")
    noRepeat.update(true)
  }
}

